package main

import (
	"context"
	"fmt"
	"go.etcd.io/etcd/clientv3"
	"go.etcd.io/etcd/mvcc/mvccpb"
	"math/rand"
	"time"
)

func main() {
	var (
		config clientv3.Config
	)

	config = clientv3.Config{
		Endpoints:   []string{"127.0.0.1:2379"},
		DialTimeout: 5 * time.Second, // 超时时间
	}

	// 建立连接
	client, err := clientv3.New(config)
	if err != nil {
		fmt.Println(err)
		return
	}

	// KV
	kv := clientv3.NewKV(client)

	//模拟etcd中kv的变化
	go func() {
		rand.NewSource(time.Now().UnixNano())
		for {
			kv.Put(context.TODO(), "/cron/jobs/job7", fmt.Sprintf("%d", time.Now().UnixNano()))
			time.Sleep(1 * time.Second)
			kv.Delete(context.TODO(), "/cron/jobs/job7")
			time.Sleep(1 * time.Second)
		}
	}()

	getResp, err := kv.Get(context.TODO(), "/cron/jobs/job7")
	if err != nil {
		fmt.Println(err)
		return
	}

	if len(getResp.Kvs) != 0 {
		fmt.Println("当前值", string(getResp.Kvs[0].Value))
	}

	// 获取当前事务ID
	watchStartRevision := getResp.Header.Revision + 1

	// 创建一个watch
	watcher := clientv3.NewWatcher(client)

	// 启动监听
	fmt.Println("从该版本向后监听", watchStartRevision)

	ctx, cancelFunc := context.WithCancel(context.TODO())
	time.AfterFunc(5*time.Second, func() {
		cancelFunc()
	})

	watchRespChan := watcher.Watch(ctx, "/cron/jobs/job7", clientv3.WithRev(watchStartRevision))

	// 处理KV变化事件
	for watchResp := range watchRespChan {
		for _, ev := range watchResp.Events {
			switch ev.Type {
			case mvccpb.PUT:
				fmt.Println("修改", string(ev.Kv.Key), string(ev.Kv.Value))
				fmt.Println("Revision", ev.Kv.CreateRevision, ev.Kv.ModRevision)
			case mvccpb.DELETE:
				fmt.Println("删除", string(ev.Kv.Key), string(ev.Kv.Value))
				fmt.Println("Revision", ev.Kv.CreateRevision, ev.Kv.ModRevision)
			}
		}
	}

}
